temp_dir.py 8.2 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274
  1. from __future__ import absolute_import
  2. import errno
  3. import itertools
  4. import logging
  5. import os.path
  6. import tempfile
  7. from contextlib import contextmanager
  8. from pip._vendor.contextlib2 import ExitStack
  9. from pip._vendor.six import ensure_text
  10. from pip._internal.utils.misc import enum, rmtree
  11. from pip._internal.utils.typing import MYPY_CHECK_RUNNING
  12. if MYPY_CHECK_RUNNING:
  13. from typing import Any, Dict, Iterator, Optional, TypeVar, Union
  14. _T = TypeVar('_T', bound='TempDirectory')
  15. logger = logging.getLogger(__name__)
  16. # Kinds of temporary directories. Only needed for ones that are
  17. # globally-managed.
  18. tempdir_kinds = enum(
  19. BUILD_ENV="build-env",
  20. EPHEM_WHEEL_CACHE="ephem-wheel-cache",
  21. REQ_BUILD="req-build",
  22. )
  23. _tempdir_manager = None # type: Optional[ExitStack]
  24. @contextmanager
  25. def global_tempdir_manager():
  26. # type: () -> Iterator[None]
  27. global _tempdir_manager
  28. with ExitStack() as stack:
  29. old_tempdir_manager, _tempdir_manager = _tempdir_manager, stack
  30. try:
  31. yield
  32. finally:
  33. _tempdir_manager = old_tempdir_manager
  34. class TempDirectoryTypeRegistry(object):
  35. """Manages temp directory behavior
  36. """
  37. def __init__(self):
  38. # type: () -> None
  39. self._should_delete = {} # type: Dict[str, bool]
  40. def set_delete(self, kind, value):
  41. # type: (str, bool) -> None
  42. """Indicate whether a TempDirectory of the given kind should be
  43. auto-deleted.
  44. """
  45. self._should_delete[kind] = value
  46. def get_delete(self, kind):
  47. # type: (str) -> bool
  48. """Get configured auto-delete flag for a given TempDirectory type,
  49. default True.
  50. """
  51. return self._should_delete.get(kind, True)
  52. _tempdir_registry = None # type: Optional[TempDirectoryTypeRegistry]
  53. @contextmanager
  54. def tempdir_registry():
  55. # type: () -> Iterator[TempDirectoryTypeRegistry]
  56. """Provides a scoped global tempdir registry that can be used to dictate
  57. whether directories should be deleted.
  58. """
  59. global _tempdir_registry
  60. old_tempdir_registry = _tempdir_registry
  61. _tempdir_registry = TempDirectoryTypeRegistry()
  62. try:
  63. yield _tempdir_registry
  64. finally:
  65. _tempdir_registry = old_tempdir_registry
  66. class _Default(object):
  67. pass
  68. _default = _Default()
  69. class TempDirectory(object):
  70. """Helper class that owns and cleans up a temporary directory.
  71. This class can be used as a context manager or as an OO representation of a
  72. temporary directory.
  73. Attributes:
  74. path
  75. Location to the created temporary directory
  76. delete
  77. Whether the directory should be deleted when exiting
  78. (when used as a contextmanager)
  79. Methods:
  80. cleanup()
  81. Deletes the temporary directory
  82. When used as a context manager, if the delete attribute is True, on
  83. exiting the context the temporary directory is deleted.
  84. """
  85. def __init__(
  86. self,
  87. path=None, # type: Optional[str]
  88. delete=_default, # type: Union[bool, None, _Default]
  89. kind="temp", # type: str
  90. globally_managed=False, # type: bool
  91. ):
  92. super(TempDirectory, self).__init__()
  93. if delete is _default:
  94. if path is not None:
  95. # If we were given an explicit directory, resolve delete option
  96. # now.
  97. delete = False
  98. else:
  99. # Otherwise, we wait until cleanup and see what
  100. # tempdir_registry says.
  101. delete = None
  102. if path is None:
  103. path = self._create(kind)
  104. self._path = path
  105. self._deleted = False
  106. self.delete = delete
  107. self.kind = kind
  108. if globally_managed:
  109. assert _tempdir_manager is not None
  110. _tempdir_manager.enter_context(self)
  111. @property
  112. def path(self):
  113. # type: () -> str
  114. assert not self._deleted, (
  115. "Attempted to access deleted path: {}".format(self._path)
  116. )
  117. return self._path
  118. def __repr__(self):
  119. # type: () -> str
  120. return "<{} {!r}>".format(self.__class__.__name__, self.path)
  121. def __enter__(self):
  122. # type: (_T) -> _T
  123. return self
  124. def __exit__(self, exc, value, tb):
  125. # type: (Any, Any, Any) -> None
  126. if self.delete is not None:
  127. delete = self.delete
  128. elif _tempdir_registry:
  129. delete = _tempdir_registry.get_delete(self.kind)
  130. else:
  131. delete = True
  132. if delete:
  133. self.cleanup()
  134. def _create(self, kind):
  135. # type: (str) -> str
  136. """Create a temporary directory and store its path in self.path
  137. """
  138. # We realpath here because some systems have their default tmpdir
  139. # symlinked to another directory. This tends to confuse build
  140. # scripts, so we canonicalize the path by traversing potential
  141. # symlinks here.
  142. path = os.path.realpath(
  143. tempfile.mkdtemp(prefix="pip-{}-".format(kind))
  144. )
  145. logger.debug("Created temporary directory: %s", path)
  146. return path
  147. def cleanup(self):
  148. # type: () -> None
  149. """Remove the temporary directory created and reset state
  150. """
  151. self._deleted = True
  152. if os.path.exists(self._path):
  153. # Make sure to pass unicode on Python 2 to make the contents also
  154. # use unicode, ensuring non-ASCII names and can be represented.
  155. rmtree(ensure_text(self._path))
  156. class AdjacentTempDirectory(TempDirectory):
  157. """Helper class that creates a temporary directory adjacent to a real one.
  158. Attributes:
  159. original
  160. The original directory to create a temp directory for.
  161. path
  162. After calling create() or entering, contains the full
  163. path to the temporary directory.
  164. delete
  165. Whether the directory should be deleted when exiting
  166. (when used as a contextmanager)
  167. """
  168. # The characters that may be used to name the temp directory
  169. # We always prepend a ~ and then rotate through these until
  170. # a usable name is found.
  171. # pkg_resources raises a different error for .dist-info folder
  172. # with leading '-' and invalid metadata
  173. LEADING_CHARS = "-~.=%0123456789"
  174. def __init__(self, original, delete=None):
  175. # type: (str, Optional[bool]) -> None
  176. self.original = original.rstrip('/\\')
  177. super(AdjacentTempDirectory, self).__init__(delete=delete)
  178. @classmethod
  179. def _generate_names(cls, name):
  180. # type: (str) -> Iterator[str]
  181. """Generates a series of temporary names.
  182. The algorithm replaces the leading characters in the name
  183. with ones that are valid filesystem characters, but are not
  184. valid package names (for both Python and pip definitions of
  185. package).
  186. """
  187. for i in range(1, len(name)):
  188. for candidate in itertools.combinations_with_replacement(
  189. cls.LEADING_CHARS, i - 1):
  190. new_name = '~' + ''.join(candidate) + name[i:]
  191. if new_name != name:
  192. yield new_name
  193. # If we make it this far, we will have to make a longer name
  194. for i in range(len(cls.LEADING_CHARS)):
  195. for candidate in itertools.combinations_with_replacement(
  196. cls.LEADING_CHARS, i):
  197. new_name = '~' + ''.join(candidate) + name
  198. if new_name != name:
  199. yield new_name
  200. def _create(self, kind):
  201. # type: (str) -> str
  202. root, name = os.path.split(self.original)
  203. for candidate in self._generate_names(name):
  204. path = os.path.join(root, candidate)
  205. try:
  206. os.mkdir(path)
  207. except OSError as ex:
  208. # Continue if the name exists already
  209. if ex.errno != errno.EEXIST:
  210. raise
  211. else:
  212. path = os.path.realpath(path)
  213. break
  214. else:
  215. # Final fallback on the default behavior.
  216. path = os.path.realpath(
  217. tempfile.mkdtemp(prefix="pip-{}-".format(kind))
  218. )
  219. logger.debug("Created temporary directory: %s", path)
  220. return path